package com.atguigu.app.func;

import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.common.GmallConfig;
import com.atguigu.utils.DruidDSUtil;
import com.atguigu.utils.JedisUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import redis.clients.jedis.Jedis;

import java.sql.PreparedStatement;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;

public class DimSinkFunction extends RichSinkFunction<JSONObject> {

    private DruidDataSource druidDataSource;

    @Override
    public void open(Configuration parameters) throws Exception {
        druidDataSource = DruidDSUtil.getDruidDataSource();
    }

    //Value:{"database":"gmall-220623-flink","table":"comment_info","type":"insert","ts":1669162958,"xid":1111,"xoffset":13941,"data":{"id":1595211185799847960,"user_id":119,"nick_name":null,"head_img":null,"sku_id":31,"spu_id":10,"order_id":987,"appraise":"1204","comment_txt":"评论内容：48384811984748167197482849234338563286217912223261","create_time":"2022-08-02 08:22:38","operate_time":null},"sink_table":"dim_xxx_xxx"}
    @Override
    public void invoke(JSONObject value, Context context) throws Exception {

        //获取连接
        DruidPooledConnection connection = druidDataSource.getConnection();

        //拼接SQL语句:upsert into db.tn(id,name,sex) values('1001','zhangsan','male')
        String sink_table = value.getString("sink_table");
        JSONObject data = value.getJSONObject("data");
        String sql = genUpsertSql(sink_table, data);
        System.out.println("SQL:" + sql);

        //判断如果为更新数据,则先将数据写到Redis
        if ("update".equals(value.getString("type"))) {
            Jedis jedis = JedisUtil.getJedis();

            String redisKey = "DIM:" + sink_table.toUpperCase() + ":" + data.getString("id");
            System.out.println(redisKey);

            JSONObject jsonObject = new JSONObject();

            Set<Map.Entry<String, Object>> entries = data.entrySet();
            for (Map.Entry<String, Object> entry : entries) {
                jsonObject.put(entry.getKey().toUpperCase(), entry.getValue().toString());
            }

            jedis.set(redisKey, jsonObject.toJSONString());
            jedis.expire(redisKey, 24 * 3600);

            jedis.close();
        }

        //将数据写出
        PreparedStatement preparedStatement = connection.prepareStatement(sql);
        preparedStatement.execute();
        connection.commit();

        //释放资源
        connection.close();
        preparedStatement.close();

    }

    //拼接SQL语句:upsert into db.tn(id,name,sex) values('1001','zhangsan','male')
    //data:{"id":"1001","name":"zhangsan","sex":"male"}
    private String genUpsertSql(String sinkTable, JSONObject data) {

        //取出列名和列值
        Set<String> columns = data.keySet();//sex,id,name
        Collection<Object> values = data.values();//male,1001,zhangsan

        //StringUtils.join(columns, ",")   ===>  sex,id,name
        //StringUtils.join(values, "','")    ===>  male','1001','zhangsan

        return "upsert into " + GmallConfig.PHOENIX_DB + "." + sinkTable +
                "(" + StringUtils.join(columns, ",") + ") values('" +
                StringUtils.join(values, "','") + "')";
    }
}
